import org.apache.commons.collections.IteratorUtils;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;

import java.util.List;

//不同店铺的销售额统计
public class RichMapFunction1 extends RichMapFunction<Order, List<Order>>
    {
        private MapState<Long, Order> mapState;

        @Override
        public void open(Configuration parameters) throws Exception
        {
            super.open(parameters);
            MapStateDescriptor<Long, Order> productRank = new MapStateDescriptor<Long, Order>("productRank", Long.class, Order.class);
            mapState = getRuntimeContext().getMapState(productRank);
        }



//        下面是核心逻辑
//        接收的参数是一个订单
        @Override
        public List<Order> map(Order order) throws Exception
        {
            if (mapState.contains(order.productId))//判断是否包含该key
            {
                Order acc = mapState.get(order.productId);
                order.sale += acc.sale;//根据productId得到售价,更新该货物当前总销售额
            }
            mapState.put(order.productId, order);
//mapState里面存放的是一个映射关系
//订单id与order对象(其实是为了存放销售额)


            return IteratorUtils.toList(mapState.values().iterator());
//            这里的values的意思就是返回一堆Order对象组成的List
        }
    }

